// SPDX-License-Identifier: GPL-3.0-or-later

#include "libnetdata/libnetdata.h"
#include "agent_cloud_link.h"

// Read from the config file -- new section [agent_cloud_link]
// Defaults are supplied
int aclk_recv_maximum = 0; // default 20
int aclk_send_maximum = 0; // default 20

int aclk_port = 0;          // default 1883
char *aclk_hostname = NULL; //default localhost
int aclk_subscribed = 0;

int aclk_metadata_submitted = 0;
int waiting_init = 1;
int cmdpause = 0; // Used to pause query processing

BUFFER *aclk_buffer = NULL;
char *global_base_topic = NULL;

int cloud_to_agent_parse(JSON_ENTRY *e)
{
    struct aclk_request *data = e->callback_data;

    switch(e->type) {
        case JSON_OBJECT:
            e->callback_function = cloud_to_agent_parse;
            break;
        case JSON_ARRAY:
            e->callback_function = cloud_to_agent_parse;
            break;
        case JSON_STRING:
            if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
                data->msg_id = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) {
                data->type_id = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
                data->topic = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
                data->url = strdupz(e->data.string);
                break;
            }
            break;
        case JSON_NUMBER:
            if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
                data->version = atol(e->data.string);
                break;
            }
            break;

        case JSON_BOOLEAN:
            break;

        case JSON_NULL:
            break;
    }
    return 0;
}

//char *send_http_request(char *host, char *port, char *url, BUFFER *b)
//{
//    struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
//
//    buffer_flush(b);
//    buffer_sprintf(
//        b,
//        "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n",
//        url, host);
//    int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout);
//
//    if (unlikely(sock == -1)) {
//        error("Handshake failed");
//        return NULL;
//    }
//
//    SSL_CTX *ctx = security_initialize_openssl_client();
//    // Certificate chain: not updating the stores - do we need private CA roots?
//    // Calls to SSL_CTX_load_verify_locations would go here.
//    SSL *ssl = SSL_new(ctx);
//    SSL_set_fd(ssl, sock);
//    int err = SSL_connect(ssl);
//    SSL_write(ssl, b->buffer, b->len); // Timeout options?
//    int bytes_read = SSL_read(ssl, b->buffer, b->len);
//    SSL_shutdown(ssl);
//    close(sock);
//}

// Set when we have connection up and running from the connection callback
int aclk_connection_initialized = 0;
// TODO modify previous comment if this stays this way
// con_initialized means library is initialized and ready to be used
// acklk_connected means there is actually an established connection
int aclk_mqtt_connected = 0;

static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;

#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)

#define QUERY_LOCK netdata_mutex_lock(&query_mutex)
#define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)

pthread_cond_t  query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;

#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)


struct aclk_query {
    time_t created;
    time_t run_after; // Delay run until after this time
    char *topic;      // Topic to respond to
    char *data;       // Internal data (NULL if request from the cloud)
    char *msg_id;     // msg_id generated by the cloud (NULL if internal)
    char *query;      // The actual query
    u_char deleted;     // Mark deleted for garbage collect
    struct aclk_query *next;
};

struct aclk_query_queue {
    struct aclk_query *aclk_query_head;
    struct aclk_query *aclk_query_tail;
    u_int64_t count;
} aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };

/*
 * Free a query structure when done
 */

void aclk_query_free(struct aclk_query *this_query)
{
    if (unlikely(!this_query))
        return;

    freez(this_query->topic);
    freez(this_query->query);
    if (this_query->data)
        freez(this_query->data);
    if (this_query->msg_id)
        freez(this_query->msg_id);
    freez(this_query);
    return;
}

// Returns the entry after which we need to create a new entry to run at the specified time
// If NULL is returned we need to add to HEAD
// Called with locked entries

struct aclk_query *aclk_query_find_position(time_t time_to_run)
{
    struct aclk_query *tmp_query, *last_query;

    last_query = NULL;
    tmp_query = aclk_queue.aclk_query_head;

    while (tmp_query) {
        if (tmp_query->run_after > time_to_run)
            return last_query;
        last_query = tmp_query;
        tmp_query = tmp_query->next;
    }
    return last_query;
}

// Need to have a lock before calling this
struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query)
{
    struct aclk_query *tmp_query;

    tmp_query = aclk_queue.aclk_query_head;

    while (tmp_query) {
        if (likely(!tmp_query->deleted)) {
            if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) {
                if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
                    (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0)))
                    return tmp_query;
            }
        }
        tmp_query = tmp_query->next;
    }
    return NULL;
}

/*
 * Add a query to execute, the result will be send to the specified topic
 */

int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal)
{
    struct aclk_query *new_query, *tmp_query;

    // Ignore all commands while we wait for the agent to initialize
    if (unlikely(waiting_init))
        return 0;

    run_after = now_realtime_sec() + run_after;

    QUERY_LOCK;
    tmp_query = aclk_query_find(topic, data, msg_id, query);
    if (unlikely(tmp_query)) {
        if (tmp_query->run_after == run_after) {
            QUERY_UNLOCK;
            QUERY_THREAD_WAKEUP;
            return 0;
        }
        tmp_query->deleted = 1;
    }

    new_query = callocz(1, sizeof(struct aclk_query));
    if (internal) {
        new_query->topic = strdupz(topic);
        new_query->query = strdupz(query);
    } else {
        new_query->topic = topic;
        new_query->query = query;
        new_query->msg_id = msg_id;
    }

    if (data)
        new_query->data = strdupz(data);

    new_query->next = NULL;
    new_query->created = now_realtime_sec();
    new_query->run_after = run_after;

    info("Added query (%s) (%s)", topic, query);

    tmp_query = aclk_query_find_position(run_after);

    if (tmp_query) {
        new_query->next = tmp_query->next;
        tmp_query->next = new_query;
        if (tmp_query == aclk_queue.aclk_query_tail)
            aclk_queue.aclk_query_tail = new_query;
        aclk_queue.count++;
        QUERY_UNLOCK;
        QUERY_THREAD_WAKEUP;
        return 0;
    }

    new_query->next = aclk_queue.aclk_query_head;
    aclk_queue.aclk_query_head = new_query;
    aclk_queue.count++;

    QUERY_UNLOCK;
    QUERY_THREAD_WAKEUP;
    return 0;

//    if (likely(aclk_queue.aclk_query_tail)) {
//        aclk_queue.aclk_query_tail->next = new_query;
//        aclk_queue.aclk_query_tail = new_query;
//        aclk_queue.count++;
//        QUERY_UNLOCK;
//        return 0;
//    }
//
//    if (likely(!aclk_queue.aclk_query_head)) {
//        aclk_queue.aclk_query_head = new_query;
//        aclk_queue.aclk_query_tail = new_query;
//        aclk_queue.count++;
//        QUERY_UNLOCK;
//        return 0;
//    }
//    QUERY_UNLOCK;
//    return 0;
}

inline int aclk_submit_request(struct aclk_request *request)
{
    return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0);
}

/*
 * Get the next query to process - NULL if nothing there
 * The caller needs to free memory by calling aclk_query_free()
 *
 *      topic
 *      query
 *      The structure itself
 *
 */
struct aclk_query *aclk_queue_pop()
{
    struct aclk_query *this_query;

    QUERY_LOCK;

    if (likely(!aclk_queue.aclk_query_head)) {
        QUERY_UNLOCK;
        return NULL;
    }

    this_query = aclk_queue.aclk_query_head;

    if (this_query->run_after > now_realtime_sec()) {
        info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
        QUERY_UNLOCK;
        return NULL;
    }

    aclk_queue.count--;
    aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;

    if (likely(!aclk_queue.aclk_query_head)) {
        aclk_queue.aclk_query_tail = NULL;
    }

    QUERY_UNLOCK;
    return this_query;
}

// This will give the base topic that the agent will publish messages.
// subtopics will be sent under the base topic e.g.  base_topic/subtopic
// This is called by aclk_init(), to compute the base topic once and have
// it stored internally.
// Need to check if additional logic should be added to make sure that there
// is enough information to determine the base topic at init time

// TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch
// that on the fly

char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action)
{
    static char *topic = NULL;

    if (unlikely(!is_agent_claimed()))
        return NULL;

    ACLK_LOCK;

    if (unlikely(action == PUBLICH_TOPIC_FREE)) {
        if (likely(topic)) {
            freez(topic);
            topic = NULL;
        }

        ACLK_UNLOCK;

        return NULL;
    }

    if (unlikely(action == PUBLICH_TOPIC_REBUILD)) {
        ACLK_UNLOCK;
        get_publish_base_topic(PUBLICH_TOPIC_FREE);
        return get_publish_base_topic(PUBLICH_TOPIC_GET);
    }

    if (unlikely(!topic)) {
        char tmp_topic[ACLK_MAX_TOPIC + 1];

        sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
        topic = strdupz(tmp_topic);
    }

    ACLK_UNLOCK;
    return topic;
}

char *get_topic(char *sub_topic, char *final_topic, int max_size)
{
    if (unlikely(!global_base_topic))
        global_base_topic = GET_PUBLISH_BASE_TOPIC;

    if (unlikely(!global_base_topic))
        return sub_topic;

    snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);

    return final_topic;
}

// Wait for ACLK connection to be established
int aclk_wait_for_initialization()
{
    if (unlikely(!aclk_connection_initialized)) {
        time_t now = now_realtime_sec();

        while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) {
            sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT);
            _link_event_loop(0);
        }

        if (unlikely(!aclk_connection_initialized)) {
            error("ACLK connection cannot be established");
            return 1;
        }
    }
    return 0;
}

/*
 * This function will fetch the next pending command and process it
 *
 */
int aclk_process_query()
{
    struct aclk_query *this_query;
    static u_int64_t query_count = 0;
    //int rc;

    if (unlikely(cmdpause))
        return 0;

    if (!aclk_connection_initialized)
        return 0;

    this_query = aclk_queue_pop();
    if (likely(!this_query)) {
        //info("No pending queries");
        return 0;
    }

    if (unlikely(this_query->deleted)) {
        info("Garbage collect query %s:%s", this_query->topic, this_query->query);
        aclk_query_free(this_query);
        return 1;
    }

    query_count++;
    info(
        "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query,
        (int) (now_realtime_sec() - this_query->created));

    if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) {
        struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
        w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
        strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
        w->cookie1[0] = 0;      // Simulate web_client_create_on_fd()
        w->cookie2[0] = 0;      // Simulate web_client_create_on_fd()
        w->acl = 0x1f;

        char *mysep = strchr(this_query->query, '?');
        if (mysep) {
            strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
            *mysep = '\0';
        } else
            strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);

        mysep = strrchr(this_query->query, '/');

        // TODO: ignore return code for now
        web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");

        //TODO: handle bad response perhaps in a different way. For now it does to the payload
        //if (rc == HTTP_RESP_OK || 1) {
            buffer_flush(aclk_buffer);

            aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data);
            aclk_buffer->contenttype = CT_APPLICATION_JSON;
            aclk_send_message(this_query->topic, aclk_buffer->buffer);
        //} else
        //   error("Query RESP: %s", w->response.data->buffer);

        buffer_free(w->response.data);
        freez(w);
        aclk_query_free(this_query);
        return 1;
    }

    if (strcmp((char *)this_query->topic, "_chart") == 0) {
        aclk_send_single_chart(this_query->data, this_query->query);
    }

    aclk_query_free(this_query);

    return 1;
}

// Launch a query processing thread

/*
 * Process all pending queries
 * Return 0 if no queries were processed, 1 otherwise
 *
 */

int aclk_process_queries()
{
    if (unlikely(cmdpause))
        return 0;

    // Return if no queries pending
    if (likely(!aclk_queue.count))
        return 0;

    info("Processing %d queries", (int ) aclk_queue.count);

    while (aclk_process_query()) {
        //rc = _link_event_loop(0);
    };

    return 1;
}

static void aclk_query_thread_cleanup(void *ptr)
{
    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

    info("cleaning up...");

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}

/**
 * MAin query processing thread
 *
 */
void *aclk_query_main_thread(void *ptr)
{
    netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);

    while (!netdata_exit) {

        QUERY_THREAD_LOCK;

        if (unlikely(!aclk_metadata_submitted)) {
            aclk_send_metadata();
            aclk_metadata_submitted = 1;
        }

        if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
            sleep_usec(USEC_PER_SEC * 1);

        if (likely(aclk_connection_initialized && !netdata_exit)) {
            while (aclk_process_queries()) {
                // Sleep for a few ms and retry maybe we have something to process
                // before going to sleep
                // TODO: This needs improvement to avoid missed queries
                sleep_usec(USEC_PER_MS * 100);
            }
        }

        QUERY_THREAD_UNLOCK;

    } // forever
    info("Shutting down query processing thread");
    netdata_thread_cleanup_pop(1);
    return NULL;
}

// Thread cleanup
static void aclk_main_cleanup(void *ptr)
{
    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

    info("cleaning up...");

    QUERY_THREAD_WAKEUP;

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}

/**
 * Main agent cloud link thread
 *
 * This thread will simply call the main event loop that handles
 * pending requests - both inbound and outbound
 *
 * @param ptr is a pointer to the netdata_static_thread structure.
 *
 * @return It always returns NULL
 */
void *aclk_main(void *ptr)
{
    //netdata_thread_t *query_thread;
    struct netdata_static_thread query_thread;

    memset(&query_thread, 0, sizeof(query_thread));

    netdata_thread_cleanup_push(aclk_main_cleanup, ptr);

    if (unlikely(!aclk_buffer))
        aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);

    assert(aclk_buffer != NULL);

    //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
    //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
    info("Waiting for netdata to be ready");
    while (!netdata_ready) {
        sleep_usec(USEC_PER_MS * 300);
    }
    info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT);
    sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT);

    // Ok mark we are ready to accept incoming requests
    waiting_init = 0;

    while (!netdata_exit) {
        // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds
        // TODO: Handle the unclaim command as well -- we may need to shutdown the connection
        if (likely(!is_agent_claimed())) {
            sleep_usec(USEC_PER_SEC * 60);
            info("Checking agent claiming status");
            continue;
        }

        if (unlikely(!aclk_connection_initialized)) {
            static int initializing = 0;

            if (likely(initializing)) {
                _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
                continue;
            }
            initializing = 1;
            info("Initializing connection");
            //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer);
            if (unlikely(aclk_init(ACLK_INIT))) {
                // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying.
                sleep_usec(USEC_PER_SEC * 60);
                continue;
            } else {
                sleep_usec(USEC_PER_SEC * 1);
            }
            _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
            continue;
        }

        if (unlikely(!aclk_subscribed) && aclk_mqtt_connected) {
            aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
        }
        if (unlikely(!query_thread.thread && aclk_mqtt_connected)) {
            query_thread.thread = mallocz(sizeof(netdata_thread_t));
            netdata_thread_create(
                query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
        }

        //TODO: Check if there is a return code
        _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);

    } // forever
    aclk_shutdown();

    netdata_thread_cleanup_pop(1);
    return NULL;
}

/*
 * Send a message to the cloud, using a base topic and sib_topic
 * The final topic will be in the form <base_topic>/<sub_topic>
 * If base_topic is missing then the global_base_topic will be used (if available)
 *
 */
int aclk_send_message(char *sub_topic, char *message)
{
    int rc;
    static int skip_due_to_shutdown = 0;
    char topic[ACLK_MAX_TOPIC + 1];
    char *final_topic;

    if (!aclk_connection_initialized)
        return 0;

    if (unlikely(netdata_exit)) {
        if (unlikely(!aclk_connection_initialized))
            return 1;

        ++skip_due_to_shutdown;
        if (unlikely(!(skip_due_to_shutdown % 100)))
            info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown);
        return 1;
    }

    if (unlikely(!message))
        return 0;

    if (unlikely(aclk_wait_for_initialization()))
        return 1;

    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);

    ACLK_LOCK;
    rc = _link_send_message(final_topic, message);
    ACLK_UNLOCK;

    // TODO: Add better handling -- error will flood the logfile here
    if (unlikely(rc))
        error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));

    return rc;
}

/*
 * Subscribe to a topic in the cloud
 * The final subscription will be in the form
 * /agent/claim_id/<sub_topic>
 */
int aclk_subscribe(char *sub_topic, int qos)
{
    int rc;
    //static char *global_base_topic = NULL;
    char topic[ACLK_MAX_TOPIC + 1];
    char *final_topic;

    if (!aclk_connection_initialized)
        return 0;

    if (unlikely(netdata_exit)) {
        return 1;
    }

    if (unlikely(aclk_wait_for_initialization()))
        return 1;

    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);

    ACLK_LOCK;
    rc = _link_subscribe(final_topic, qos);
    ACLK_UNLOCK;

    // TODO: Add better handling -- error will flood the logfile here
    if (unlikely(rc))
        error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));

    return rc;
}

// This is called from a callback when the link goes up
void aclk_connect(void *ptr)
{
    (void) ptr;
    info("Connection detected");
    return;
}

// This is called from a callback when the link goes down
void aclk_disconnect(void *ptr)
{
    (void) ptr;
    info("Disconnect detected");
    aclk_subscribed = 0;
    aclk_metadata_submitted = 0;
}

void aclk_shutdown()
{
    info("Shutdown initiated");
    aclk_connection_initialized = 0;
    _link_shutdown();
    info("Shutdown complete");
}

int aclk_init(ACLK_INIT_ACTION action)
{
    (void) action;

    static int init = 0;
    int rc;

    if (likely(init))
        return 0;

    aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20);
    aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20);

    aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost");
    aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 9002);

    info("Maximum parallel outgoing messages %d", aclk_send_maximum);
    info("Maximum parallel incoming messages %d", aclk_recv_maximum);

    // This will setup the base publish topic internally
    //get_publish_base_topic(PUBLICH_TOPIC_GET);

    // initialize the low level link to the cloud
    rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect);
    if (unlikely(rc)) {
        error("Failed to initialize the agent cloud link library");
        return 1;
    }
    global_base_topic = GET_PUBLISH_BASE_TOPIC;
    init = 1;

    return 0;
}

// Use this to disable encoding of quotes and newlines so that
// MQTT subscriber can display more readable data on screen

void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
{
    uuid_t uuid;
    char uuid_str[36 + 1];

    if (unlikely(!msg_id)) {
        uuid_generate(uuid);
        uuid_unparse(uuid, uuid_str);
        msg_id = uuid_str;
    }

    buffer_sprintf(
        dest,
        "\t{\"type\": \"%s\",\n"
        "\t\"msg-id\": \"%s\",\n"
        "\t\"version\": %s,\n"
        "\t\"payload\": ",
        type, msg_id, ACLK_VERSION);
}

#define EYE_FRIENDLY 1

// encapsulate contents into metadata message as per ACLK documentation
void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents)
{
#ifndef EYE_FRIENDLY
    char *tmp_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    char *src, *dst;
#endif

    buffer_sprintf(
        dest,
        "\t{\"type\": \"%s\",\n"
        "\t\"msg-id\": \"%s\",\n"
        "\t\"payload\": %s\n\t}",
        type, msg_id ? msg_id : "", contents->buffer);

#ifndef EYE_FRIENDLY
    //TODO: this is the initial escaping, It will expanded
    src = dest->buffer;
    dst = tmp_buffer;
    while (*src) {
        switch (*src) {
            case '0x0a':
            case '\n':
                *dst++ = '\\';
                *dst++ = 'n';
                break;
            case '\"':
                *dst++ = '\\';
                *dst++ = '\"';
                break;
            case '\'':
                *dst++ = '\\';
                *dst++ = '\"';
                break;
            default:
                *dst++ = *src;
        }
        src++;
    }
    *dst = '\0';

    buffer_flush(dest);
    buffer_sprintf(dest, "%s", tmp_buffer);

    freez(tmp_buffer);
#endif
    return;
}

//TODO: this has been changed in the latest specs. We need to pack the data in one MQTT
//message with a payload and has a list of json objects
int aclk_send_alarm_metadata()
{
    //TODO: improve locking on the buffer -- same lock is used for the message send
    //improve error handling
    ACLK_LOCK;
    buffer_flush(aclk_buffer);
    // Alarms configuration
    aclk_create_header(aclk_buffer, "alarms", NULL);
    health_alarms2json(localhost, aclk_buffer, 1);
    buffer_sprintf(aclk_buffer,"\n}");
    ACLK_UNLOCK;
    aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);

    // Alarms log
    ACLK_LOCK;
    buffer_flush(aclk_buffer);
    aclk_create_header(aclk_buffer, "alarms_log", NULL);
    health_alarm_log2json(localhost, aclk_buffer, 0);
    buffer_sprintf(aclk_buffer,"\n}");
    ACLK_UNLOCK;
    aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);

    return 0;
}


// Send info metadata message to the cloud if the link is established
// or on request
int aclk_send_metadata()
{
    ACLK_LOCK;

    buffer_flush(aclk_buffer);

    aclk_create_header(aclk_buffer, "connect", NULL);
    buffer_sprintf(aclk_buffer,"{\n\t \"info\" : ");
    web_client_api_request_v1_info_fill_buffer(localhost, aclk_buffer);
    buffer_sprintf(aclk_buffer,", \n\t \"charts\" : ");
    charts2json(localhost, aclk_buffer);
    buffer_sprintf(aclk_buffer,"\n}\n}");
    aclk_buffer->contenttype = CT_APPLICATION_JSON;

    ACLK_UNLOCK;

    aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer);

    aclk_send_alarm_metadata();

    return 0;
}

//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)

int aclk_send_single_chart(char *hostname, char *chart)
{
    RRDHOST *target_host;
    ACLK_LOCK;

    buffer_flush(aclk_buffer);

    target_host = rrdhost_find_by_hostname(hostname, 0);
    if (!target_host)
        return 1;

    RRDSET *st = rrdset_find(target_host, chart);

    if (!st)
        st = rrdset_find_byname(target_host, chart);

    if (!st) {
        info("FAILED to find chart %s", chart);
        return 1;
    }

    aclk_buffer->contenttype = CT_APPLICATION_JSON;

    buffer_flush(aclk_buffer);

    aclk_create_header(aclk_buffer, "chart", NULL);

    rrdset2json(st, aclk_buffer, NULL, NULL);
    buffer_sprintf(aclk_buffer,"\n}\n}");


    ACLK_UNLOCK;
    aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer);
    return 0;
}

int    aclk_update_chart(RRDHOST *host, char *chart_name)
{
    (void) host;
    (void) chart_name;
#ifndef ENABLE_ACLK
    return 0;
#else
    if (host != localhost)
        return 0;

    aclk_queue_query("_chart", host->hostname, NULL, chart_name, 2, 1);
    return 0;
#endif
}

int    aclk_update_alarm(RRDHOST *host, char *alarm_name)
{
    if (host != localhost)
        return 0;

    aclk_queue_query("_alarm", host->hostname, NULL, alarm_name, 2, 1);
    return 0;
}


//TODO: add and check the incoming type e.g http
int aclk_handle_cloud_request(char *payload)
{
    struct aclk_request cloud_to_agent = { .msg_id = NULL, .topic = NULL, .url = NULL, .version = 1};

    int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);

    if (unlikely(JSON_OK != rc)) {
        error("Malformed json request (%s)", payload);
        return 1;
    }

    if (unlikely(!cloud_to_agent.url || !cloud_to_agent.topic)) {
        return 1;
    }

    aclk_submit_request(&cloud_to_agent);

    return 0;
}
